Skip to content

Commit

Permalink
satellite/metainfo: support deleting specific object version
Browse files Browse the repository at this point in the history
Protobuf definition is ready to support deleting specific version of
object so we just need to wire requested version into metainfo
BeginDeleteObject endpoint.

Dependencies bumped to get latest metainfo protobuf definition.

#6221

Change-Id: Ifc3cc0b49d9acdf4f7e57e7184b0f2ae045f9113
  • Loading branch information
mniewrzal authored and Storj Robot committed Oct 25, 2023
1 parent 9338f3f commit 988ebba
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 8 deletions.
18 changes: 18 additions & 0 deletions satellite/metabase/common.go
Expand Up @@ -5,6 +5,7 @@ package metabase

import (
"database/sql/driver"
"encoding/binary"
"math"
"sort"
"strconv"
Expand Down Expand Up @@ -366,6 +367,23 @@ const PendingVersion = Version(0)
// Version in DB is represented as INT4.
const MaxVersion = Version(math.MaxInt32)

// Encode encodes version to bytes.
// TODO(ver): this is not final approach to version encoding. It's simplified
// version for internal testing purposes. Will be changed in future.
func (v Version) Encode() []byte {
var bytes [8]byte
binary.BigEndian.PutUint64(bytes[:], uint64(v))
return bytes[:]
}

// VersionFromBytes decodes version from bytes.
func VersionFromBytes(bytes []byte) (Version, error) {
if len(bytes) != 8 {
return Version(0), ErrInvalidRequest.New("invalid version")
}
return Version(binary.BigEndian.Uint64(bytes)), nil
}

// ObjectStatus defines the status that the object is in.
//
// There are two types of objects:
Expand Down
28 changes: 21 additions & 7 deletions satellite/metainfo/endpoint_object.go
Expand Up @@ -1258,6 +1258,10 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

if err := validateObjectVersion(req.ObjectVersion); err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

var deletedObjects []*pb.Object

if req.GetStatus() == int32(metabase.Pending) {
Expand All @@ -1281,7 +1285,7 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
}
}
} else {
deletedObjects, err = endpoint.DeleteCommittedObject(ctx, keyInfo.ProjectID, string(req.Bucket), metabase.ObjectKey(req.EncryptedObjectKey))
deletedObjects, err = endpoint.DeleteCommittedObject(ctx, keyInfo.ProjectID, string(req.Bucket), metabase.ObjectKey(req.EncryptedObjectKey), req.ObjectVersion)
}
if err != nil {
if !canRead && !canList {
Expand Down Expand Up @@ -1742,9 +1746,8 @@ func (endpoint *Endpoint) pendingObjectEntryToProtoListItem(ctx context.Context,
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
// TODO: see note on DeleteObjectAnyStatus.
func (endpoint *Endpoint) DeleteCommittedObject(
ctx context.Context, projectID uuid.UUID, bucket string, object metabase.ObjectKey,
ctx context.Context, projectID uuid.UUID, bucket string, object metabase.ObjectKey, version []byte,
) (deletedObjects []*pb.Object, err error) {
defer mon.Task()(&ctx, projectID.String(), bucket, object)(&err)

Expand All @@ -1756,9 +1759,21 @@ func (endpoint *Endpoint) DeleteCommittedObject(

var result metabase.DeleteObjectResult
if endpoint.config.ServerSideCopy {
result, err = endpoint.metabase.DeleteObjectLastCommitted(ctx, metabase.DeleteObjectLastCommitted{
ObjectLocation: req,
})
if len(version) == 0 {
result, err = endpoint.metabase.DeleteObjectLastCommitted(ctx, metabase.DeleteObjectLastCommitted{
ObjectLocation: req,
})
} else {
var v metabase.Version
v, err = metabase.VersionFromBytes(version)
if err != nil {
return nil, err
}
result, err = endpoint.metabase.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
ObjectLocation: req,
Version: v,
})
}
} else {
result, err = endpoint.metabase.DeleteObjectsAllVersions(ctx, metabase.DeleteObjectsAllVersions{Locations: []metabase.ObjectLocation{req}})
}
Expand All @@ -1785,7 +1800,6 @@ func (endpoint *Endpoint) DeleteCommittedObject(
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
// TODO: see note on DeleteObjectAnyStatus.
func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metabase.ObjectStream, usePendingObjectTable bool) (deletedObjects []*pb.Object, err error) {
req := metabase.DeletePendingObject{
ObjectStream: stream,
Expand Down
62 changes: 61 additions & 1 deletion satellite/metainfo/endpoint_object_test.go
Expand Up @@ -689,6 +689,66 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
}
})

t.Run("delete specific version", func(t *testing.T) {
defer ctx.Check(deleteBucket)

apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]

err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "test-object", testrand.Bytes(100))
require.NoError(t, err)

// get encrypted object key and version
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)

endpoint := planet.Satellites[0].Metainfo.Endpoint

// first try to delete not existing version
nonExistingVersion := objects[0].Version + 1
response, err := endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: nonExistingVersion.Encode(),
})
require.NoError(t, err)
require.Nil(t, response.Object)

// now delete using explicit version
response, err = endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: objects[0].Version.Encode(),
})
require.NoError(t, err)
require.NotNil(t, response.Object)
require.EqualValues(t, objects[0].ObjectKey, response.Object.EncryptedObjectKey)

err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "test-object", testrand.Bytes(100))
require.NoError(t, err)

// now delete using empty version (latest version)
response, err = endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: nil,
})
require.NoError(t, err)
require.NotNil(t, response.Object)
require.EqualValues(t, objects[0].ObjectKey, response.Object.EncryptedObjectKey)

objects, err = planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
require.Empty(t, objects)
})
})
}

Expand Down Expand Up @@ -1973,7 +2033,7 @@ func TestEndpoint_DeleteCommittedObject(t *testing.T) {
deleteObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
projectID := planet.Uplinks[0].Projects[0].ID

_, err := planet.Satellites[0].Metainfo.Endpoint.DeleteCommittedObject(ctx, projectID, bucket, metabase.ObjectKey(encryptedKey))
_, err := planet.Satellites[0].Metainfo.Endpoint.DeleteCommittedObject(ctx, projectID, bucket, metabase.ObjectKey(encryptedKey), []byte{})
require.NoError(t, err)
}
testDeleteObject(t, createObject, deleteObject)
Expand Down
7 changes: 7 additions & 0 deletions satellite/metainfo/validation.go
Expand Up @@ -303,6 +303,13 @@ func validateBucketLabel(label []byte) error {
return nil
}

func validateObjectVersion(version []byte) error {
if len(version) != 0 && len(version) != 8 {
return Error.New("invalid object version")
}
return nil
}

func isLowerLetter(r byte) bool {
return r >= 'a' && r <= 'z'
}
Expand Down

0 comments on commit 988ebba

Please sign in to comment.