Skip to content

Commit

Permalink
satellite/metabase: include part of stream id into encoded version
Browse files Browse the repository at this point in the history
We want to put StreamID into encoded version to validate object
uniqueness. This change only starts including StreamID into version.
Validation will be a separate change.

#6538

Change-Id: I541738a7a51576ba957dc485f61385e37948703b
  • Loading branch information
mniewrzal authored and Storj Robot committed Dec 18, 2023
1 parent 6925f87 commit 8b53066
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 39 deletions.
46 changes: 34 additions & 12 deletions satellite/metabase/common.go
Expand Up @@ -369,21 +369,43 @@ const PendingVersion = Version(0)
// Version in DB is represented as INT4.
const MaxVersion = Version(math.MaxInt32)

// Encode encodes version to bytes.
// TODO(ver): this is not final approach to version encoding. It's simplified
// version for internal testing purposes. Will be changed in future.
func (v Version) Encode() []byte {
var bytes [16]byte
binary.BigEndian.PutUint64(bytes[:], uint64(v))
return bytes[:]
// StreamVersionID represents combined Version and StreamID suffix for purposes of public API.
// First 8 bytes represents Version and rest are object StreamID suffix.
// TODO(ver): we may consider renaming this type to VersionID but to do that
// we would need to rename metabase.Version into metabase.SequenceNumber/metabase.Sequence to
// avoid confusion.
type StreamVersionID uuid.UUID

// Version returns Version encoded into stream version id.
func (s StreamVersionID) Version() Version {
return Version(binary.BigEndian.Uint64(s[:8]))
}

// StreamIDSuffix returns StreamID suffix encoded into stream version id.
func (s StreamVersionID) StreamIDSuffix() []byte {
return s[8:]
}

// Bytes returnes stream version id bytes.
func (s StreamVersionID) Bytes() []byte {
return s[:]
}

func newStreamVersionID(version Version, streamID uuid.UUID) StreamVersionID {
var sv StreamVersionID
binary.BigEndian.PutUint64(sv[:8], uint64(version))
copy(sv[8:], streamID[8:])
return sv
}

// VersionFromBytes decodes version from bytes.
func VersionFromBytes(bytes []byte) (Version, error) {
if len(bytes) != 16 {
return Version(0), ErrInvalidRequest.New("invalid version")
// StreamVersionIDFromBytes decodes stream version id from bytes.
func StreamVersionIDFromBytes(bytes []byte) (_ StreamVersionID, err error) {
if len(bytes) != len(StreamVersionID{}) {
return StreamVersionID{}, ErrInvalidRequest.New("invalid stream version id")
}
return Version(binary.BigEndian.Uint64(bytes)), nil
var sv StreamVersionID
copy(sv[:], bytes)
return sv, nil
}

// ObjectStatus defines the status that the object is in.
Expand Down
36 changes: 36 additions & 0 deletions satellite/metabase/common_test.go
Expand Up @@ -4,6 +4,7 @@
package metabase_test

import (
"math"
"strconv"
"testing"

Expand Down Expand Up @@ -720,6 +721,41 @@ func TestPiecesUpdate(t *testing.T) {
}
}

func TestStreamVersionID(t *testing.T) {
expectedVersion := metabase.Version(1)
expectedStreamID := uuid.UUID{2, 2, 2, 2, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 4, 4}

object := metabase.Object{
ObjectStream: metabase.ObjectStream{
Version: expectedVersion,
StreamID: expectedStreamID,
},
}
encodedVersion := object.StreamVersionID().Bytes()
require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 1, 4, 4, 4, 4, 4, 4, 4, 4}, encodedVersion)

streamVersionID, err := metabase.StreamVersionIDFromBytes(encodedVersion)
require.NoError(t, err)
require.Equal(t, expectedVersion, streamVersionID.Version())
require.Equal(t, expectedStreamID[8:], streamVersionID.StreamIDSuffix())

expectedVersion = metabase.Version(testrand.Int63n(math.MaxInt64))
expectedStreamID = testrand.UUID()

object = metabase.Object{
ObjectStream: metabase.ObjectStream{
Version: expectedVersion,
StreamID: expectedStreamID,
},
}
encodedVersion = object.StreamVersionID().Bytes()

streamVersionID, err = metabase.StreamVersionIDFromBytes(encodedVersion)
require.NoError(t, err)
require.Equal(t, expectedVersion, streamVersionID.Version())
require.Equal(t, expectedStreamID[8:], streamVersionID.StreamIDSuffix())
}

func BenchmarkSegmentPieceSize(b *testing.B) {
segment := metabase.Segment{
EncryptedSize: 64 * memory.MiB.Int32(),
Expand Down
5 changes: 5 additions & 0 deletions satellite/metabase/get.go
Expand Up @@ -30,6 +30,11 @@ func (obj *Object) IsMigrated() bool {
return obj.TotalPlainSize <= 0
}

// StreamVersionID returns byte representation of object stream version id.
func (obj *Object) StreamVersionID() StreamVersionID {
return newStreamVersionID(obj.Version, obj.StreamID)
}

// Segment segment metadata.
// TODO define separated struct.
type Segment RawSegment
Expand Down
5 changes: 5 additions & 0 deletions satellite/metabase/list.go
Expand Up @@ -36,6 +36,11 @@ type ObjectEntry struct {
Encryption storj.EncryptionParameters
}

// StreamVersionID returns byte representation of object stream version id.
func (entry ObjectEntry) StreamVersionID() StreamVersionID {
return newStreamVersionID(entry.Version, entry.StreamID)
}

// ObjectsIterator iterates over a sequence of ObjectEntry items.
type ObjectsIterator interface {
Next(ctx context.Context, item *ObjectEntry) bool
Expand Down
26 changes: 13 additions & 13 deletions satellite/metainfo/endpoint_object.go
Expand Up @@ -378,14 +378,14 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
ObjectLocation: objectLocation,
})
} else {
var v metabase.Version
v, err = metabase.VersionFromBytes(req.ObjectVersion)
var sv metabase.StreamVersionID
sv, err = metabase.StreamVersionIDFromBytes(req.ObjectVersion)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
mbObject, err = endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
ObjectLocation: objectLocation,
Version: v,
Version: sv.Version(),
})
}
if err != nil {
Expand Down Expand Up @@ -516,8 +516,8 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
},
})
} else {
var v metabase.Version
v, err = metabase.VersionFromBytes(req.ObjectVersion)
var sv metabase.StreamVersionID
sv, err = metabase.StreamVersionIDFromBytes(req.ObjectVersion)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
Expand All @@ -527,7 +527,7 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
BucketName: string(req.Bucket),
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
},
Version: v,
Version: sv.Version(),
})
}
if err != nil {
Expand Down Expand Up @@ -960,11 +960,11 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
}

if len(req.VersionCursor) != 0 {
version, err := metabase.VersionFromBytes(req.VersionCursor)
sv, err := metabase.StreamVersionIDFromBytes(req.VersionCursor)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
cursorVersion = version
cursorVersion = sv.Version()
}

includeCustomMetadata := true
Expand Down Expand Up @@ -1488,7 +1488,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj
Bucket: []byte(object.BucketName),
EncryptedObjectKey: []byte(object.ObjectKey),
Version: int32(object.Version), // TODO incompatible types
ObjectVersion: object.Version.Encode(),
ObjectVersion: object.StreamVersionID().Bytes(),
StreamId: streamID,
Status: pb.Object_Status(object.Status),
ExpiresAt: expires,
Expand Down Expand Up @@ -1519,7 +1519,7 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket
EncryptedObjectKey: []byte(entry.ObjectKey),
Version: int32(entry.Version), // TODO incompatible types
Status: pb.Object_Status(entry.Status),
ObjectVersion: entry.Version.Encode(),
ObjectVersion: entry.StreamVersionID().Bytes(),
}

expiresAt := time.Time{}
Expand Down Expand Up @@ -1639,14 +1639,14 @@ func (endpoint *Endpoint) DeleteCommittedObject(
Suspended: suspended,
})
} else {
var v metabase.Version
v, err = metabase.VersionFromBytes(version)
var sv metabase.StreamVersionID
sv, err = metabase.StreamVersionIDFromBytes(version)
if err != nil {
return nil, err
}
result, err = endpoint.metabase.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
ObjectLocation: req,
Version: v,
Version: sv.Version(),
})
}
if err != nil {
Expand Down
29 changes: 15 additions & 14 deletions satellite/metainfo/endpoint_object_test.go
Expand Up @@ -136,17 +136,16 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
require.Equal(t, item.EncryptedObjectKey, response.Object.EncryptedObjectKey)

// get with WRONG version, should return error
version, err := metabase.VersionFromBytes(response.Object.ObjectVersion)
require.NoError(t, err)
version++
nonExistingVersion := version.Encode()
object := metabase.Object{}
object.Version = metabase.Version(response.Object.Version) + 1
copy(object.StreamID[:], response.Object.StreamId[:])
_, err = satellite.Metainfo.Endpoint.GetObject(ctx, &pb.GetObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(expectedBucketName),
EncryptedObjectKey: item.EncryptedObjectKey,
ObjectVersion: nonExistingVersion,
ObjectVersion: object.StreamVersionID().Bytes(),
})
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
}
Expand Down Expand Up @@ -734,27 +733,28 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket)
require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedObjectKey)
require.EqualValues(t, committedObject.Version.Encode(), downloadObjectResponse.Object.ObjectVersion)
require.EqualValues(t, committedObject.StreamVersionID().Bytes(), downloadObjectResponse.Object.ObjectVersion)

// download using explicit version
downloadObjectResponse, err = satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
Bucket: []byte("testbucket"),
EncryptedObjectKey: []byte(committedObject.ObjectKey),
ObjectVersion: committedObject.Version.Encode(),
ObjectVersion: committedObject.StreamVersionID().Bytes(),
})
require.NoError(t, err)
require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket)
require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedObjectKey)
require.EqualValues(t, committedObject.Version.Encode(), downloadObjectResponse.Object.ObjectVersion)
require.EqualValues(t, committedObject.StreamVersionID().Bytes(), downloadObjectResponse.Object.ObjectVersion)

// download using NON EXISTING version
nonExistingVersion := committedObject.Version + 1
nonExistingObject := committedObject
nonExistingObject.Version++
_, err = satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
Bucket: []byte("testbucket"),
EncryptedObjectKey: []byte(committedObject.ObjectKey),
ObjectVersion: nonExistingVersion.Encode(),
ObjectVersion: nonExistingObject.StreamVersionID().Bytes(),
})
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
})
Expand All @@ -774,14 +774,15 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
endpoint := planet.Satellites[0].Metainfo.Endpoint

// first try to delete not existing version
nonExistingVersion := objects[0].Version + 1
nonExistingObject := objects[0]
nonExistingObject.Version++
response, err := endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: nonExistingVersion.Encode(),
ObjectVersion: nonExistingObject.StreamVersionID().Bytes(),
})
require.NoError(t, err)
require.Nil(t, response.Object)
Expand All @@ -793,7 +794,7 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: objects[0].Version.Encode(),
ObjectVersion: objects[0].StreamVersionID().Bytes(),
})
require.NoError(t, err)
require.NotNil(t, response.Object)
Expand Down Expand Up @@ -1265,7 +1266,7 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
allObjects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, project.ID, object.Bucket)
require.NoError(t, err)
require.Len(t, allObjects, 1)
require.Equal(t, listResponse.Items[0].ObjectVersion, allObjects[0].Version.Encode())
require.Equal(t, listResponse.Items[0].ObjectVersion, allObjects[0].StreamVersionID().Bytes())
})

t.Run("get object IP", func(t *testing.T) {
Expand Down

0 comments on commit 8b53066

Please sign in to comment.