diff --git a/satellite/metabase/common.go b/satellite/metabase/common.go index 729df7101f23..06dd0ce98169 100644 --- a/satellite/metabase/common.go +++ b/satellite/metabase/common.go @@ -369,21 +369,43 @@ const PendingVersion = Version(0) // Version in DB is represented as INT4. const MaxVersion = Version(math.MaxInt32) -// Encode encodes version to bytes. -// TODO(ver): this is not final approach to version encoding. It's simplified -// version for internal testing purposes. Will be changed in future. -func (v Version) Encode() []byte { - var bytes [16]byte - binary.BigEndian.PutUint64(bytes[:], uint64(v)) - return bytes[:] +// StreamVersionID represents combined Version and StreamID suffix for purposes of public API. +// First 8 bytes represents Version and rest are object StreamID suffix. +// TODO(ver): we may consider renaming this type to VersionID but to do that +// we would need to rename metabase.Version into metabase.SequenceNumber/metabase.Sequence to +// avoid confusion. +type StreamVersionID uuid.UUID + +// Version returns Version encoded into stream version id. +func (s StreamVersionID) Version() Version { + return Version(binary.BigEndian.Uint64(s[:8])) +} + +// StreamIDSuffix returns StreamID suffix encoded into stream version id. +func (s StreamVersionID) StreamIDSuffix() []byte { + return s[8:] +} + +// Bytes returnes stream version id bytes. +func (s StreamVersionID) Bytes() []byte { + return s[:] +} + +func newStreamVersionID(version Version, streamID uuid.UUID) StreamVersionID { + var sv StreamVersionID + binary.BigEndian.PutUint64(sv[:8], uint64(version)) + copy(sv[8:], streamID[8:]) + return sv } -// VersionFromBytes decodes version from bytes. -func VersionFromBytes(bytes []byte) (Version, error) { - if len(bytes) != 16 { - return Version(0), ErrInvalidRequest.New("invalid version") +// StreamVersionIDFromBytes decodes stream version id from bytes. +func StreamVersionIDFromBytes(bytes []byte) (_ StreamVersionID, err error) { + if len(bytes) != len(StreamVersionID{}) { + return StreamVersionID{}, ErrInvalidRequest.New("invalid stream version id") } - return Version(binary.BigEndian.Uint64(bytes)), nil + var sv StreamVersionID + copy(sv[:], bytes) + return sv, nil } // ObjectStatus defines the status that the object is in. diff --git a/satellite/metabase/common_test.go b/satellite/metabase/common_test.go index edddaba6ec18..9ac7d797a906 100644 --- a/satellite/metabase/common_test.go +++ b/satellite/metabase/common_test.go @@ -4,6 +4,7 @@ package metabase_test import ( + "math" "strconv" "testing" @@ -720,6 +721,41 @@ func TestPiecesUpdate(t *testing.T) { } } +func TestStreamVersionID(t *testing.T) { + expectedVersion := metabase.Version(1) + expectedStreamID := uuid.UUID{2, 2, 2, 2, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 4, 4} + + object := metabase.Object{ + ObjectStream: metabase.ObjectStream{ + Version: expectedVersion, + StreamID: expectedStreamID, + }, + } + encodedVersion := object.StreamVersionID().Bytes() + require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 1, 4, 4, 4, 4, 4, 4, 4, 4}, encodedVersion) + + streamVersionID, err := metabase.StreamVersionIDFromBytes(encodedVersion) + require.NoError(t, err) + require.Equal(t, expectedVersion, streamVersionID.Version()) + require.Equal(t, expectedStreamID[8:], streamVersionID.StreamIDSuffix()) + + expectedVersion = metabase.Version(testrand.Int63n(math.MaxInt64)) + expectedStreamID = testrand.UUID() + + object = metabase.Object{ + ObjectStream: metabase.ObjectStream{ + Version: expectedVersion, + StreamID: expectedStreamID, + }, + } + encodedVersion = object.StreamVersionID().Bytes() + + streamVersionID, err = metabase.StreamVersionIDFromBytes(encodedVersion) + require.NoError(t, err) + require.Equal(t, expectedVersion, streamVersionID.Version()) + require.Equal(t, expectedStreamID[8:], streamVersionID.StreamIDSuffix()) +} + func BenchmarkSegmentPieceSize(b *testing.B) { segment := metabase.Segment{ EncryptedSize: 64 * memory.MiB.Int32(), diff --git a/satellite/metabase/get.go b/satellite/metabase/get.go index 3d25a5cde326..dc787a1c145a 100644 --- a/satellite/metabase/get.go +++ b/satellite/metabase/get.go @@ -30,6 +30,11 @@ func (obj *Object) IsMigrated() bool { return obj.TotalPlainSize <= 0 } +// StreamVersionID returns byte representation of object stream version id. +func (obj *Object) StreamVersionID() StreamVersionID { + return newStreamVersionID(obj.Version, obj.StreamID) +} + // Segment segment metadata. // TODO define separated struct. type Segment RawSegment diff --git a/satellite/metabase/list.go b/satellite/metabase/list.go index 71e583ce4a02..932dc40d685f 100644 --- a/satellite/metabase/list.go +++ b/satellite/metabase/list.go @@ -36,6 +36,11 @@ type ObjectEntry struct { Encryption storj.EncryptionParameters } +// StreamVersionID returns byte representation of object stream version id. +func (entry ObjectEntry) StreamVersionID() StreamVersionID { + return newStreamVersionID(entry.Version, entry.StreamID) +} + // ObjectsIterator iterates over a sequence of ObjectEntry items. type ObjectsIterator interface { Next(ctx context.Context, item *ObjectEntry) bool diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index fceb727a731c..ec017db88b07 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -378,14 +378,14 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques ObjectLocation: objectLocation, }) } else { - var v metabase.Version - v, err = metabase.VersionFromBytes(req.ObjectVersion) + var sv metabase.StreamVersionID + sv, err = metabase.StreamVersionIDFromBytes(req.ObjectVersion) if err != nil { return nil, endpoint.convertMetabaseErr(err) } mbObject, err = endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{ ObjectLocation: objectLocation, - Version: v, + Version: sv.Version(), }) } if err != nil { @@ -516,8 +516,8 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown }, }) } else { - var v metabase.Version - v, err = metabase.VersionFromBytes(req.ObjectVersion) + var sv metabase.StreamVersionID + sv, err = metabase.StreamVersionIDFromBytes(req.ObjectVersion) if err != nil { return nil, endpoint.convertMetabaseErr(err) } @@ -527,7 +527,7 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown BucketName: string(req.Bucket), ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey), }, - Version: v, + Version: sv.Version(), }) } if err != nil { @@ -960,11 +960,11 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } if len(req.VersionCursor) != 0 { - version, err := metabase.VersionFromBytes(req.VersionCursor) + sv, err := metabase.StreamVersionIDFromBytes(req.VersionCursor) if err != nil { return nil, endpoint.convertMetabaseErr(err) } - cursorVersion = version + cursorVersion = sv.Version() } includeCustomMetadata := true @@ -1488,7 +1488,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj Bucket: []byte(object.BucketName), EncryptedObjectKey: []byte(object.ObjectKey), Version: int32(object.Version), // TODO incompatible types - ObjectVersion: object.Version.Encode(), + ObjectVersion: object.StreamVersionID().Bytes(), StreamId: streamID, Status: pb.Object_Status(object.Status), ExpiresAt: expires, @@ -1519,7 +1519,7 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket EncryptedObjectKey: []byte(entry.ObjectKey), Version: int32(entry.Version), // TODO incompatible types Status: pb.Object_Status(entry.Status), - ObjectVersion: entry.Version.Encode(), + ObjectVersion: entry.StreamVersionID().Bytes(), } expiresAt := time.Time{} @@ -1639,14 +1639,14 @@ func (endpoint *Endpoint) DeleteCommittedObject( Suspended: suspended, }) } else { - var v metabase.Version - v, err = metabase.VersionFromBytes(version) + var sv metabase.StreamVersionID + sv, err = metabase.StreamVersionIDFromBytes(version) if err != nil { return nil, err } result, err = endpoint.metabase.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{ ObjectLocation: req, - Version: v, + Version: sv.Version(), }) } if err != nil { diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index b5a36492741e..c09a94a42ca5 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -136,17 +136,16 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { require.Equal(t, item.EncryptedObjectKey, response.Object.EncryptedObjectKey) // get with WRONG version, should return error - version, err := metabase.VersionFromBytes(response.Object.ObjectVersion) - require.NoError(t, err) - version++ - nonExistingVersion := version.Encode() + object := metabase.Object{} + object.Version = metabase.Version(response.Object.Version) + 1 + copy(object.StreamID[:], response.Object.StreamId[:]) _, err = satellite.Metainfo.Endpoint.GetObject(ctx, &pb.GetObjectRequest{ Header: &pb.RequestHeader{ ApiKey: apiKey.SerializeRaw(), }, Bucket: []byte(expectedBucketName), EncryptedObjectKey: item.EncryptedObjectKey, - ObjectVersion: nonExistingVersion, + ObjectVersion: object.StreamVersionID().Bytes(), }) require.True(t, errs2.IsRPC(err, rpcstatus.NotFound)) } @@ -734,27 +733,28 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { require.NoError(t, err) require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket) require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedObjectKey) - require.EqualValues(t, committedObject.Version.Encode(), downloadObjectResponse.Object.ObjectVersion) + require.EqualValues(t, committedObject.StreamVersionID().Bytes(), downloadObjectResponse.Object.ObjectVersion) // download using explicit version downloadObjectResponse, err = satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{ Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()}, Bucket: []byte("testbucket"), EncryptedObjectKey: []byte(committedObject.ObjectKey), - ObjectVersion: committedObject.Version.Encode(), + ObjectVersion: committedObject.StreamVersionID().Bytes(), }) require.NoError(t, err) require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket) require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedObjectKey) - require.EqualValues(t, committedObject.Version.Encode(), downloadObjectResponse.Object.ObjectVersion) + require.EqualValues(t, committedObject.StreamVersionID().Bytes(), downloadObjectResponse.Object.ObjectVersion) // download using NON EXISTING version - nonExistingVersion := committedObject.Version + 1 + nonExistingObject := committedObject + nonExistingObject.Version++ _, err = satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{ Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()}, Bucket: []byte("testbucket"), EncryptedObjectKey: []byte(committedObject.ObjectKey), - ObjectVersion: nonExistingVersion.Encode(), + ObjectVersion: nonExistingObject.StreamVersionID().Bytes(), }) require.True(t, errs2.IsRPC(err, rpcstatus.NotFound)) }) @@ -774,14 +774,15 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { endpoint := planet.Satellites[0].Metainfo.Endpoint // first try to delete not existing version - nonExistingVersion := objects[0].Version + 1 + nonExistingObject := objects[0] + nonExistingObject.Version++ response, err := endpoint.BeginDeleteObject(ctx, &pb.BeginDeleteObjectRequest{ Header: &pb.RequestHeader{ ApiKey: apiKey.SerializeRaw(), }, Bucket: []byte(bucketName), EncryptedObjectKey: []byte(objects[0].ObjectKey), - ObjectVersion: nonExistingVersion.Encode(), + ObjectVersion: nonExistingObject.StreamVersionID().Bytes(), }) require.NoError(t, err) require.Nil(t, response.Object) @@ -793,7 +794,7 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { }, Bucket: []byte(bucketName), EncryptedObjectKey: []byte(objects[0].ObjectKey), - ObjectVersion: objects[0].Version.Encode(), + ObjectVersion: objects[0].StreamVersionID().Bytes(), }) require.NoError(t, err) require.NotNil(t, response.Object) @@ -1265,7 +1266,7 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) { allObjects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, project.ID, object.Bucket) require.NoError(t, err) require.Len(t, allObjects, 1) - require.Equal(t, listResponse.Items[0].ObjectVersion, allObjects[0].Version.Encode()) + require.Equal(t, listResponse.Items[0].ObjectVersion, allObjects[0].StreamVersionID().Bytes()) }) t.Run("get object IP", func(t *testing.T) {